# -*- coding: utf-8 -*-
from datetime import timedelta
from utils.operators.spark_submit import SparkSubmitOperator
from jms.dm.route.dm_route_whole_base_dt import dm__dm_route_whole_base_dt
from airflow.operators.dummy_operator import DummyOperator

jms_dm__dm_route_recommend_all_dt = SparkSubmitOperator(
    conn_id='spark_route',
    task_id='jms_dm__dm_route_recommend_all_dt',
    pool_slots=10,
    email=['zhangqinglin@jtexpress.com','yl_bigdata@yl-scm.com'],
    name='jms_dm__dm_route_recommend_all_dt',
    driver_memory='5G',
    executor_memory='21G',
    executor_cores=8,
    num_executors=100,
    jars='hdfs:///route/package/graphframes-0.7.0-spark2.3-s_2.11.jar,hdfs:///route/package/common-1.0-SNAPSHOT.jar',
    java_class='com.yunlu.bigdata.jobs.route.export.SelectedRouteAllRecommend',
    application='hdfs:///route/package/SelectedRouteAll/original-jobs-1.0-SNAPSHOT.jar',
    conf={
        'spark.dynamicAllocation.enabled': 'true',  # 动态资源开启
        'spark.shuffle.service.enabled' : 'true',  # 动态资源 Shuffle 服务开启
        'spark.dynamicAllocation.maxExecutors'  : 120,  # 动态资源最大扩容 Executor 数
        'spark.dynamicAllocation.cachedExecutorIdleTimeout': 60,  # 动态资源自动释放闲置 Executor 的超时时间(s)
        'spark.sql.sources.partitionOverwriteMode' : 'dynamic',  # 允许删改已存在的分区
        'spark.executor.memoryOverhead' : '6G',  # 堆外内存
        'spark.sql.shuffle.partitions' : 1200,
        'spark.executor.extraJavaOptions': '-XX:+UseG1GC -XX:ParallelGCThreads=5'
    },
    application_args=['{{ execution_date | cst_ds }}'],
    execution_timeout=timedelta(hours=2),
)
jms_dm__dm_route_recommend_all_dt << dm__dm_route_whole_base_dt


jms_dm__dm_route_lastest_all_dt = SparkSubmitOperator(
    conn_id='spark_route',
    task_id='jms_dm__dm_route_lastest_all_dt',
    pool_slots=10,
    email=['zhangqinglin@jtexpress.com','yl_bigdata@yl-scm.com'],
    name='jms_dm__dm_route_lastest_all_dt',
    driver_memory='5G',
    executor_memory='21G',
    executor_cores=8,
    num_executors=100,
    jars='hdfs:///route/package/graphframes-0.7.0-spark2.3-s_2.11.jar,hdfs:///route/package/common-1.0-SNAPSHOT.jar',
    java_class='com.yunlu.bigdata.jobs.route.export.SelectedRouteAllLastest',
    application='hdfs:///route/package/SelectedRouteAll/original-jobs-1.0-SNAPSHOT.jar',
    application_args=['{{ execution_date | cst_ds }}'],
    conf={
        'spark.dynamicAllocation.enabled': 'true',  # 动态资源开启
        'spark.shuffle.service.enabled' : 'true',  # 动态资源 Shuffle 服务开启
        'spark.dynamicAllocation.maxExecutors'  : 120,  # 动态资源最大扩容 Executor 数
        'spark.dynamicAllocation.cachedExecutorIdleTimeout': 60,  # 动态资源自动释放闲置 Executor 的超时时间(s)
        'spark.sql.sources.partitionOverwriteMode' : 'dynamic',  # 允许删改已存在的分区
        'spark.executor.memoryOverhead' : '6G',  # 堆外内存
        'spark.sql.shuffle.partitions' : 1200,
        'spark.executor.extraJavaOptions': '-XX:+UseG1GC -XX:ParallelGCThreads=5'
    },
    execution_timeout=timedelta(hours=2),
)
jms_dm__dm_route_lastest_all_dt << dm__dm_route_whole_base_dt


jms_dm__dm_route_selected_all_dt = DummyOperator(
    task_id='jms_dm__dm_route_selected_all_dt',
    pool='unlimited_pool',
    priority_weight=0,
    email=['zhangqinglin@jtexpress.com','yl_bigdata@yl-scm.com'],
)
jms_dm__dm_route_selected_all_dt<<[
    jms_dm__dm_route_recommend_all_dt,
    jms_dm__dm_route_lastest_all_dt
]